*/ private readonly FutureIteratorQueue $queue; private readonly Cancellation $cancellation; private readonly string $cancellationId; /** @var Future|Future|null */ private ?Future $complete = null; public function __construct(?Cancellation $cancellation = null) { $this->queue = $queue = new FutureIteratorQueue(); $this->cancellation = $cancellation ?? new NullCancellation(); $this->cancellationId = $this->cancellation->subscribe(static function (\Throwable $reason) use ($queue): void { if ($queue->suspension) { $queue->suspension->throw($reason); $queue->suspension = null; } }); } /** * @param FutureState $state * @param Tk $key * @param Future $future */ public function enqueue(FutureState $state, mixed $key, Future $future): void { if ($this->complete) { throw new \Error('Iterator has already been marked as complete'); } $queue = $this->queue; // Using separate object to avoid a circular reference. /** * @param Tv|null $result */ $handler = static function (?\Throwable $error, mixed $result, string $id) use ( $key, $future, $queue ): void { unset($queue->pending[$id]); if ($queue->suspension) { $queue->suspension->resume([$key, $future]); $queue->suspension = null; return; } $queue->items[] = [$key, $future]; }; $id = $state->subscribe($handler); $queue->pending[$id] = $state; } public function complete(): void { if ($this->complete) { throw new \Error('Iterator has already been marked as complete'); } $this->complete = Future::complete(); if (!$this->queue->pending && $this->queue->suspension) { $this->queue->suspension->resume(); $this->queue->suspension = null; } } public function error(\Throwable $exception): void { if ($this->complete) { throw new \Error('Iterator has already been marked as complete'); } $this->complete = Future::error($exception); if (!$this->queue->pending && $this->queue->suspension) { $this->queue->suspension->throw($exception); $this->queue->suspension = null; } } /** * @return null|array{Tk, Future} */ public function consume(): ?array { if ($this->queue->suspension) { throw new \Error('Concurrent consume() operations are not supported'); } if (!$this->queue->items) { if ($this->complete && !$this->queue->pending) { return $this->complete->await(); } $this->cancellation->throwIfRequested(); $this->queue->suspension = EventLoop::getSuspension(); /** @var null|array{Tk, Future} */ return $this->queue->suspension->suspend(); } $key = \array_key_first($this->queue->items); $item = $this->queue->items[$key]; unset($this->queue->items[$key]); /** @var null|array{Tk, Future} */ return $item; } public function __destruct() { $this->cancellation->unsubscribe($this->cancellationId); foreach ($this->queue->pending as $id => $state) { $state->unsubscribe($id); } } } __halt_compiler();----SIGNATURE:----m6fVWKtnWMUckWJmFXmnNKz1A7/ieDXwcOMWyQh4lJscbmj/oYiHNmGcvRFSJtlhwEojX1THoh23Wfc4F3Q0KcvAcjtcEFxWnV6LhWtnrNLqew9+7Olu3WHnHANqpWbyhYCDqGfW9QvOwZHo2l4ii+VLbXax6XtZVt9A1px86OPfDhX1Xd/EATz74MDlidzpfiaI0KVV4Ya6DZQLis6SPjIq9sqHlGTPluJbrlOjwll7KPVUC3WHh4TGEHUsML6FmdUzVBL11ulDHQ9JjrTH3bkNZz+9FkxDI1gR+YwSvahGe3rmWPkzpPxJ+ejdT82Lymifzze2HIPvQDZAOEWulgQAVFhlcQextpIyXEzyaHajDlmenLVndP5FJy7ZOObS4Dh5BYFH6AIOZ1BzkmozWTLaFS9dSn/dDkkDB6usqaIkVj0I411Wf5hikYfbJv+4PSJHrguoW10U6Gf/+IN6td4E6ck0LPeE6hOYtboKU+/noaGMTLhv3SwbR2fVjqlzwpDZtgE/lPZWTuVf0wAU93Gwn7Cg5lJSDJEgQV2VcdkmkdNEI/UWQNhRV48aJZhVzj5RlL9NFRoVKoJFibazA6+huVRcPeBnGv8xnsOF1KxGpxPaTJslGii5/hWSZVW65ogh2tv66BjXA9tISR2TMIqphEEXROt+OVJdkyi0388=----ATTACHMENT:----ODcwODU0NjMyMjY5OTc3NCAyNjM0NTgwNzY5MDIwMzA1IDYzODA4ODUxMjI2ODU3MzQ=